sqlstore.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. package sqlstore
  2. import (
  3. "context"
  4. "fmt"
  5. "net/url"
  6. "os"
  7. "path"
  8. "path/filepath"
  9. "strings"
  10. "testing"
  11. "time"
  12. "github.com/grafana/grafana/pkg/bus"
  13. "github.com/grafana/grafana/pkg/log"
  14. m "github.com/grafana/grafana/pkg/models"
  15. "github.com/grafana/grafana/pkg/registry"
  16. "github.com/grafana/grafana/pkg/services/annotations"
  17. "github.com/grafana/grafana/pkg/services/cache"
  18. "github.com/grafana/grafana/pkg/services/sqlstore/migrations"
  19. "github.com/grafana/grafana/pkg/services/sqlstore/migrator"
  20. "github.com/grafana/grafana/pkg/services/sqlstore/sqlutil"
  21. "github.com/grafana/grafana/pkg/setting"
  22. "github.com/go-sql-driver/mysql"
  23. "github.com/go-xorm/xorm"
  24. _ "github.com/grafana/grafana/pkg/tsdb/mssql"
  25. _ "github.com/lib/pq"
  26. sqlite3 "github.com/mattn/go-sqlite3"
  27. )
  28. var (
  29. x *xorm.Engine
  30. dialect migrator.Dialect
  31. sqlog log.Logger = log.New("sqlstore")
  32. )
  33. const ContextSessionName = "db-session"
  34. func init() {
  35. registry.Register(&registry.Descriptor{
  36. Name: "SqlStore",
  37. Instance: &SqlStore{},
  38. InitPriority: registry.High,
  39. })
  40. }
  41. type SqlStore struct {
  42. Cfg *setting.Cfg `inject:""`
  43. Bus bus.Bus `inject:""`
  44. CacheService *cache.CacheService `inject:""`
  45. dbCfg DatabaseConfig
  46. engine *xorm.Engine
  47. log log.Logger
  48. Dialect migrator.Dialect
  49. skipEnsureAdmin bool
  50. }
  51. // NewSession returns a new DBSession
  52. func (ss *SqlStore) NewSession() *DBSession {
  53. return &DBSession{Session: ss.engine.NewSession()}
  54. }
  55. // WithDbSession calls the callback with an session attached to the context.
  56. func (ss *SqlStore) WithDbSession(ctx context.Context, callback dbTransactionFunc) error {
  57. sess, err := startSession(ctx, ss.engine, false)
  58. if err != nil {
  59. return err
  60. }
  61. return callback(sess)
  62. }
  63. // WithTransactionalDbSession calls the callback with an session within a transaction
  64. func (ss *SqlStore) WithTransactionalDbSession(ctx context.Context, callback dbTransactionFunc) error {
  65. return ss.inTransactionWithRetryCtx(ctx, callback, 0)
  66. }
  67. func (ss *SqlStore) inTransactionWithRetryCtx(ctx context.Context, callback dbTransactionFunc, retry int) error {
  68. sess, err := startSession(ctx, ss.engine, true)
  69. if err != nil {
  70. return err
  71. }
  72. defer sess.Close()
  73. err = callback(sess)
  74. // special handling of database locked errors for sqlite, then we can retry 3 times
  75. if sqlError, ok := err.(sqlite3.Error); ok && retry < 5 {
  76. if sqlError.Code == sqlite3.ErrLocked {
  77. sess.Rollback()
  78. time.Sleep(time.Millisecond * time.Duration(10))
  79. sqlog.Info("Database table locked, sleeping then retrying", "retry", retry)
  80. return ss.inTransactionWithRetryCtx(ctx, callback, retry+1)
  81. }
  82. }
  83. if err != nil {
  84. sess.Rollback()
  85. return err
  86. } else if err = sess.Commit(); err != nil {
  87. return err
  88. }
  89. if len(sess.events) > 0 {
  90. for _, e := range sess.events {
  91. if err = bus.Publish(e); err != nil {
  92. log.Error(3, "Failed to publish event after commit. error: %v", err)
  93. }
  94. }
  95. }
  96. return nil
  97. }
  98. func (ss *SqlStore) Init() error {
  99. ss.log = log.New("sqlstore")
  100. ss.readConfig()
  101. engine, err := ss.getEngine()
  102. if err != nil {
  103. return fmt.Errorf("Fail to connect to database: %v", err)
  104. }
  105. ss.engine = engine
  106. ss.Dialect = migrator.NewDialect(ss.engine)
  107. // temporarily still set global var
  108. x = engine
  109. dialect = ss.Dialect
  110. migrator := migrator.NewMigrator(x)
  111. migrations.AddMigrations(migrator)
  112. for _, descriptor := range registry.GetServices() {
  113. sc, ok := descriptor.Instance.(registry.DatabaseMigrator)
  114. if ok {
  115. sc.AddMigration(migrator)
  116. }
  117. }
  118. if err := migrator.Start(); err != nil {
  119. return fmt.Errorf("Migration failed err: %v", err)
  120. }
  121. // Init repo instances
  122. annotations.SetRepository(&SqlAnnotationRepo{})
  123. ss.Bus.SetTransactionManager(ss)
  124. // Register handlers
  125. ss.addUserQueryAndCommandHandlers()
  126. // ensure admin user
  127. if ss.skipEnsureAdmin {
  128. return nil
  129. }
  130. return ss.ensureAdminUser()
  131. }
  132. func (ss *SqlStore) ensureAdminUser() error {
  133. systemUserCountQuery := m.GetSystemUserCountStatsQuery{}
  134. err := ss.InTransaction(context.Background(), func(ctx context.Context) error {
  135. err := bus.DispatchCtx(ctx, &systemUserCountQuery)
  136. if err != nil {
  137. return fmt.Errorf("Could not determine if admin user exists: %v", err)
  138. }
  139. if systemUserCountQuery.Result.Count > 0 {
  140. return nil
  141. }
  142. cmd := m.CreateUserCommand{}
  143. cmd.Login = setting.AdminUser
  144. cmd.Email = setting.AdminUser + "@localhost"
  145. cmd.Password = setting.AdminPassword
  146. cmd.IsAdmin = true
  147. if err := bus.DispatchCtx(ctx, &cmd); err != nil {
  148. return fmt.Errorf("Failed to create admin user: %v", err)
  149. }
  150. ss.log.Info("Created default admin", "user", setting.AdminUser)
  151. return nil
  152. })
  153. return err
  154. }
  155. func (ss *SqlStore) buildConnectionString() (string, error) {
  156. cnnstr := ss.dbCfg.ConnectionString
  157. // special case used by integration tests
  158. if cnnstr != "" {
  159. return cnnstr, nil
  160. }
  161. switch ss.dbCfg.Type {
  162. case migrator.MYSQL:
  163. protocol := "tcp"
  164. if strings.HasPrefix(ss.dbCfg.Host, "/") {
  165. protocol = "unix"
  166. }
  167. cnnstr = fmt.Sprintf("%s:%s@%s(%s)/%s?collation=utf8mb4_unicode_ci&allowNativePasswords=true",
  168. ss.dbCfg.User, ss.dbCfg.Pwd, protocol, ss.dbCfg.Host, ss.dbCfg.Name)
  169. if ss.dbCfg.SslMode == "true" || ss.dbCfg.SslMode == "skip-verify" {
  170. tlsCert, err := makeCert("custom", ss.dbCfg)
  171. if err != nil {
  172. return "", err
  173. }
  174. mysql.RegisterTLSConfig("custom", tlsCert)
  175. cnnstr += "&tls=custom"
  176. }
  177. if ss.dbCfg.ExtraConnectionStringArgs != "" {
  178. cnnstr += "&" + ss.dbCfg.ExtraConnectionStringArgs
  179. }
  180. case migrator.POSTGRES:
  181. var host, port = "127.0.0.1", "5432"
  182. fields := strings.Split(ss.dbCfg.Host, ":")
  183. if len(fields) > 0 && len(strings.TrimSpace(fields[0])) > 0 {
  184. host = fields[0]
  185. }
  186. if len(fields) > 1 && len(strings.TrimSpace(fields[1])) > 0 {
  187. port = fields[1]
  188. }
  189. if ss.dbCfg.Pwd == "" {
  190. ss.dbCfg.Pwd = "''"
  191. }
  192. if ss.dbCfg.User == "" {
  193. ss.dbCfg.User = "''"
  194. }
  195. cnnstr = fmt.Sprintf("user=%s password=%s host=%s port=%s dbname=%s sslmode=%s sslcert=%s sslkey=%s sslrootcert=%s", ss.dbCfg.User, ss.dbCfg.Pwd, host, port, ss.dbCfg.Name, ss.dbCfg.SslMode, ss.dbCfg.ClientCertPath, ss.dbCfg.ClientKeyPath, ss.dbCfg.CaCertPath)
  196. if ss.dbCfg.ExtraConnectionStringArgs != "" {
  197. cnnstr += " " + ss.dbCfg.ExtraConnectionStringArgs
  198. }
  199. case migrator.SQLITE:
  200. // special case for tests
  201. if !filepath.IsAbs(ss.dbCfg.Path) {
  202. ss.dbCfg.Path = filepath.Join(ss.Cfg.DataPath, ss.dbCfg.Path)
  203. }
  204. os.MkdirAll(path.Dir(ss.dbCfg.Path), os.ModePerm)
  205. cnnstr = fmt.Sprintf("file:%s?cache=%s&mode=rwc", ss.dbCfg.Path, ss.dbCfg.CacheMode)
  206. if ss.dbCfg.ExtraConnectionStringArgs != "" {
  207. cnnstr += "&" + ss.dbCfg.ExtraConnectionStringArgs
  208. }
  209. default:
  210. return "", fmt.Errorf("Unknown database type: %s", ss.dbCfg.Type)
  211. }
  212. return cnnstr, nil
  213. }
  214. func (ss *SqlStore) getEngine() (*xorm.Engine, error) {
  215. connectionString, err := ss.buildConnectionString()
  216. if err != nil {
  217. return nil, err
  218. }
  219. sqlog.Info("Connecting to DB", "dbtype", ss.dbCfg.Type)
  220. engine, err := xorm.NewEngine(ss.dbCfg.Type, connectionString)
  221. if err != nil {
  222. return nil, err
  223. }
  224. engine.SetMaxOpenConns(ss.dbCfg.MaxOpenConn)
  225. engine.SetMaxIdleConns(ss.dbCfg.MaxIdleConn)
  226. engine.SetConnMaxLifetime(time.Second * time.Duration(ss.dbCfg.ConnMaxLifetime))
  227. // configure sql logging
  228. debugSql := ss.Cfg.Raw.Section("database").Key("log_queries").MustBool(false)
  229. if !debugSql {
  230. engine.SetLogger(&xorm.DiscardLogger{})
  231. } else {
  232. engine.SetLogger(NewXormLogger(log.LvlInfo, log.New("sqlstore.xorm")))
  233. engine.ShowSQL(true)
  234. engine.ShowExecTime(true)
  235. }
  236. return engine, nil
  237. }
  238. func (ss *SqlStore) readConfig() {
  239. sec := ss.Cfg.Raw.Section("database")
  240. cfgURL := sec.Key("url").String()
  241. if len(cfgURL) != 0 {
  242. dbURL, _ := url.Parse(cfgURL)
  243. ss.dbCfg.Type = dbURL.Scheme
  244. ss.dbCfg.Host = dbURL.Host
  245. pathSplit := strings.Split(dbURL.Path, "/")
  246. if len(pathSplit) > 1 {
  247. ss.dbCfg.Name = pathSplit[1]
  248. }
  249. userInfo := dbURL.User
  250. if userInfo != nil {
  251. ss.dbCfg.User = userInfo.Username()
  252. ss.dbCfg.Pwd, _ = userInfo.Password()
  253. }
  254. } else {
  255. ss.dbCfg.Type = sec.Key("type").String()
  256. ss.dbCfg.Host = sec.Key("host").String()
  257. ss.dbCfg.Name = sec.Key("name").String()
  258. ss.dbCfg.User = sec.Key("user").String()
  259. ss.dbCfg.ConnectionString = sec.Key("connection_string").String()
  260. ss.dbCfg.Pwd = sec.Key("password").String()
  261. }
  262. ss.dbCfg.MaxOpenConn = sec.Key("max_open_conn").MustInt(0)
  263. ss.dbCfg.MaxIdleConn = sec.Key("max_idle_conn").MustInt(2)
  264. ss.dbCfg.ConnMaxLifetime = sec.Key("conn_max_lifetime").MustInt(14400)
  265. ss.dbCfg.SslMode = sec.Key("ssl_mode").String()
  266. ss.dbCfg.CaCertPath = sec.Key("ca_cert_path").String()
  267. ss.dbCfg.ClientKeyPath = sec.Key("client_key_path").String()
  268. ss.dbCfg.ClientCertPath = sec.Key("client_cert_path").String()
  269. ss.dbCfg.ServerCertName = sec.Key("server_cert_name").String()
  270. ss.dbCfg.Path = sec.Key("path").MustString("data/grafana.db")
  271. ss.dbCfg.CacheMode = sec.Key("cache_mode").MustString("private")
  272. ss.dbCfg.ExtraConnectionStringArgs = sec.Key("extra_connection_string_args").String()
  273. }
  274. func InitTestDB(t *testing.T) *SqlStore {
  275. t.Helper()
  276. sqlstore := &SqlStore{}
  277. sqlstore.skipEnsureAdmin = true
  278. sqlstore.Bus = bus.New()
  279. sqlstore.CacheService = cache.New(5*time.Minute, 10*time.Minute)
  280. dbType := migrator.SQLITE
  281. // environment variable present for test db?
  282. if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
  283. dbType = db
  284. }
  285. // set test db config
  286. sqlstore.Cfg = setting.NewCfg()
  287. sec, _ := sqlstore.Cfg.Raw.NewSection("database")
  288. sec.NewKey("type", dbType)
  289. switch dbType {
  290. case "mysql":
  291. sec.NewKey("connection_string", sqlutil.TestDB_Mysql.ConnStr)
  292. case "postgres":
  293. sec.NewKey("connection_string", sqlutil.TestDB_Postgres.ConnStr)
  294. default:
  295. sec.NewKey("connection_string", sqlutil.TestDB_Sqlite3.ConnStr)
  296. }
  297. // need to get engine to clean db before we init
  298. engine, err := xorm.NewEngine(dbType, sec.Key("connection_string").String())
  299. if err != nil {
  300. t.Fatalf("Failed to init test database: %v", err)
  301. }
  302. sqlstore.Dialect = migrator.NewDialect(engine)
  303. // temp global var until we get rid of global vars
  304. dialect = sqlstore.Dialect
  305. if err := dialect.CleanDB(); err != nil {
  306. t.Fatalf("Failed to clean test db %v", err)
  307. }
  308. if err := sqlstore.Init(); err != nil {
  309. t.Fatalf("Failed to init test database: %v", err)
  310. }
  311. sqlstore.engine.DatabaseTZ = time.UTC
  312. sqlstore.engine.TZLocation = time.UTC
  313. return sqlstore
  314. }
  315. func IsTestDbMySql() bool {
  316. if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
  317. return db == migrator.MYSQL
  318. }
  319. return false
  320. }
  321. func IsTestDbPostgres() bool {
  322. if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
  323. return db == migrator.POSTGRES
  324. }
  325. return false
  326. }
  327. type DatabaseConfig struct {
  328. Type string
  329. Host string
  330. Name string
  331. User string
  332. Pwd string
  333. Path string
  334. SslMode string
  335. CaCertPath string
  336. ClientKeyPath string
  337. ClientCertPath string
  338. ServerCertName string
  339. ConnectionString string
  340. MaxOpenConn int
  341. MaxIdleConn int
  342. ConnMaxLifetime int
  343. CacheMode string
  344. ExtraConnectionStringArgs string
  345. }